package com.galeno.练习

import com.alibaba.fastjson.JSON
import com.galeno.utils.SparkUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/8/2816:49
 */
object 连续登录 {
  def main(args: Array[String]): Unit = {
    val sc: SparkContext = SparkUtil.getSc
    val logRdd: RDD[String] = sc.textFile("F://applog/app_log_2021-06-07.log")

    val disDevicesRdd: RDD[String] = logRdd.map(line => {
      val jSONObject = JSON.parseObject(line)
      val devicedId = jSONObject.getString("devicedId")
      devicedId
    }).distinct(1)

    val continueActiveRdd: RDD[String] = sc.emptyRDD[String]
    val continueRddTuple: RDD[(String, String, String)] = continueActiveRdd.map(line => {
      val arr: Array[String] = line.split(",")
      (arr(0), arr(1), arr(2))
    })

    val closed: RDD[(String, String, String)] = continueRddTuple.filter(x => {
      val closed = !x._3.equals("9999-12-31")
      closed
    })
    val unclosed: RDD[(String, String, String)] = continueRddTuple.filter(x => {
      val unclosed = x._3.equals("9999-12-31")
      unclosed
    })

    val kvClosed = unclosed.map(x => (x._1, x))
    val kvDeviced = disDevicesRdd.map(id => (id, id))

    val joined: RDD[(String, (Option[(String, String, String)], Option[String]))] = kvClosed.fullOuterJoin(kvDeviced)

    val res1: RDD[Option[(String, String, String)]] = joined.map(tpBig => {
      val twoTable: (Option[(String, String, String)], Option[String]) = tpBig._2
      val product: Option[(String, String, String)] = twoTable match {
        case (Some((deviceId, startDt, endDt)), None) => Some((deviceId, startDt, "2021-06-07"))
        case (Some((deviceId, startDt, endDt)), Some(_)) => Some((deviceId, startDt, endDt))
        case (None, Some(deviceId)) => Some((deviceId, "2021-06-07", "9999-12-31"))
        case _ => None
      }
      product
    })
    val respart1 = res1.filter(_.isDefined).map(x => {
      x.get
    })

    //结果合并
    val result: RDD[(String, String, String)] = closed.union(respart1)

    result.map(x=>{
      x._1+"---"+x._2+"---"+x._3
    }).saveAsTextFile("dataout/continout")






  }
}
